-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17747: Add compute topic and group hash #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: PoAn Yang <[email protected]>
|
CodeAnt AI is reviewing your PR. Thanks for using CodeAnt! 🎉We're free for open-source projects. if you're enjoying it, help us grow by sharing. Share on X · |
Summary of ChangesHello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces new hashing utilities within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughAdded Guava as a dependency and import allowance. Introduced deterministic hashing utilities in Group for topics and groups using Murmur3, leveraging cluster and topic metadata. Added comprehensive unit tests validating hashing stability and sensitivity. Updated gradle dependency catalogs and versions, and checkstyle import controls. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
KAFKA-17747: Add compute topic and group hashTL;DR: Adds hash computation utilities for topics and groups using Guava's Murmur3 hashing with comprehensive test coverage. Refacto PR SummaryImplements deterministic hash computation methods for Kafka topics and consumer groups using Google Guava's Murmur3 algorithm. Change HighlightsClick to expand
Sequence DiagramsequenceDiagram
participant C as Coordinator
participant G as Group
participant T as TopicImage
participant CI as ClusterImage
participant H as Hasher
C->>G: computeTopicHash(topicImage, clusterImage)
G->>H: newHasher()
G->>H: putByte(0) magic
G->>T: getId(), getName(), partitions()
T-->>G: topic metadata
G->>H: putLong(id), putString(name), putInt(partitions)
G->>CI: getBrokerRacks()
CI-->>G: rack information
G->>H: putInt(partitionId), putString(racks)
H-->>G: hash.asLong()
C->>G: computeGroupHash(topicHashes)
G->>H: combineOrdered(sortedHashes)
H-->>C: combined hash
Testing GuideClick to expand
|
|
/refacto-visz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces hashing functions for topics and groups using the Guava library, which is a valuable addition for tracking metadata changes. The implementation is sound, and the accompanying tests are thorough. I have a few suggestions to enhance the robustness and maintainability of the new code. My recommendations include using a more appropriate hashing method for UUIDs, simplifying a stream operation, and refactoring some duplicated code in the tests.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using topicImage.id().hashCode() for hashing the Uuid is not ideal as it truncates the 128-bit UUID to a 32-bit integer hash, increasing the potential for collisions. Guava's Hasher provides a putUuid(Uuid) method that correctly uses the full 128 bits of the UUID. Using this method will make the hash more robust.
Please note that this change will require updating the corresponding logic in GroupTest.java.
| .putLong(topicImage.id().hashCode()) // topic Id | |
| .putUuid(topicImage.id()) // topic Id |
| .filter(Optional::isPresent) | ||
| .map(Optional::get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pattern .filter(Optional::isPresent).map(Optional::get) can be simplified to .flatMap(Optional::stream) since your project's Java version supports it. This change will make the stream pipeline more concise and idiomatic.
| .filter(Optional::isPresent) | |
| .map(Optional::get) | |
| .flatMap(Optional::stream) |
| @Test | ||
| void testComputeTopicHash() { | ||
| long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); | ||
|
|
||
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | ||
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | ||
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | ||
| .putInt(0) // partition 0 | ||
| .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 | ||
| .putInt(1) // partition 1 | ||
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | ||
| assertEquals(topicHasher.hash().asLong(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication across the testComputeTopicHash* methods. The logic for building the hash is repeated in testComputeTopicHash, testComputeTopicHashWithDifferentMagicByte, testComputeTopicHashWithDifferentPartitionOrder, and testComputeTopicHashWithDifferentRackOrder. This makes the tests brittle and harder to maintain, as any change to the hashing logic in Group.computeTopicHash would require updates in multiple test methods.
Consider refactoring this logic into a private helper method within the test class. This would centralize the hash construction, making the tests cleaner and more maintainable.
|
Refacto is reviewing this PR. Please wait for the review comments to be posted. |
| <allow pkg="org.apache.kafka.coordinator.common.runtime" /> | ||
| <allow pkg="com.google.re2j" /> | ||
| <allow pkg="org.apache.kafka.metadata" /> | ||
| <allow pkg="com.google.common.hash" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Narrow the permission surface by marking the package as exact-match to avoid accidentally allowing subpackages under com.google.common.hash. [security]
| <allow pkg="com.google.common.hash" /> | |
| <allow pkg="com.google.common.hash" exact-match="true" /> |
Why Change? ⭐
The proposed change only adds the attribute exact-match="true" which is already used elsewhere in this import-control file (e.g. other <allow ... exact-match="true" /> lines), so the attribute is supported by the DTD referenced at the top of the file and will not introduce a syntax error.
This modification narrows the permission surface by allowing only the exact package com.google.common.hash and not its subpackages. It is syntactically valid XML and consistent with the surrounding entries. Assumptions:
- The import-control DTD used by this project supports the exact-match attribute (evidence: other lines in this file use it).
- No runtime code requires implicit access to subpackages under com.google.common.hash; if such access is required, this change would be a deliberate tightening of permissions rather than a syntactic bug.
Given these points, the change is safe from a syntax/execution perspective and does not introduce errors into the configuration itself.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Use a deterministic string representation of the topic id (e.g. id().toString()) instead of hashCode() when feeding the hasher to avoid collisions and ensure consistent hashing across JVMs. [possible bug]
| .putLong(topicImage.id().hashCode()) // topic Id | |
| .putString(topicImage.id().toString(), StandardCharsets.UTF_8) // topic Id as string |
Why Change? ⭐
The improved code is syntactically correct and uses only symbols present in the file:
- Hasher.putString(CharSequence, Charset) is a valid Guava Hasher API and StandardCharsets is already imported.
- topicImage.id().toString() is a safe replacement for topicImage.id().hashCode(): it produces a deterministic, textual representation of the id (which is typically a UUID or equivalent), avoiding reliance on JVM-specific hashCode implementations and reducing collision risk.
- The change does not introduce additional null-safety concerns beyond the original (both .hashCode() and .toString() would NPE if id() is null).
Assumptions and rationale: - topicImage.id() has a meaningful, stable toString() (which is true for UUIDs and Kafka Uuid types).
- The change intentionally alters the bytes fed into the hasher to a deterministic textual form; this is a compatible internal change to the hashing input and compiles without additional imports.
Given these verifiable points and that the edit is local and uses existing imports/APIs, this suggestion is classified as verified.
Code Review: Group Coordinator Hash Implementation👍 Well Done
📁 Selected files for review (5)
🎯 Custom Instructions
📝 Additional Comments
|
| .map(Optional::get) | ||
| .sorted() | ||
| .collect(Collectors.joining(";")); | ||
| topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing Import Statement
Code uses Collectors.joining() but missing import for java.util.stream.Collectors. This will cause compilation failure when the new computeTopicHash method is invoked. The stream operation cannot resolve Collectors without proper import declaration.
import java.util.stream.Collectors;
Commitable Suggestion
| topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" | |
| import java.util.stream.Collectors; |
Standards
- Algorithm-Correctness-Dependency-Resolution
- Logic-Verification-Compilation-Safety
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcoded Magic Byte
Magic byte value hardcoded as constant zero violates organization guideline against hardcoding variables. This reduces hash algorithm flexibility and prevents versioning of hash computation logic.
Standards
- Org-Guideline-Hardcoding of variables
- CWE-547
- OWASP-A05
| topicHasher.putInt(entry.getKey()); // partition id | ||
| String racks = Arrays.stream(entry.getValue().replicas) | ||
| .mapToObj(clusterImage::broker) | ||
| .filter(Objects::nonNull) | ||
| .map(BrokerRegistration::rack) | ||
| .filter(Optional::isPresent) | ||
| .map(Optional::get) | ||
| .sorted() | ||
| .collect(Collectors.joining(";")); | ||
| topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream Processing Inefficiency
Nested stream processing creates multiple intermediate collections for each partition. Inner stream operations execute for every partition iteration causing O(n*m) complexity where n=partitions, m=replicas. Performance degrades with partition count and replica factor affecting hash computation scalability.
Standards
- ISO-IEC-25010-Performance-Efficiency-Time-Behavior
- Optimization-Pattern-Stream-Efficiency
- Algorithmic-Complexity-Nested-Operations
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic Byte Documentation
Magic byte value lacks documentation explaining its purpose and versioning strategy. Future hash algorithm changes may require different magic bytes for backward compatibility. Adding constant with descriptive name improves maintainability.
Standards
- Clean-Code-Comments
- Maintainability-Quality-Documentation
Pull Request Feedback 🔍
|
|
CodeAnt AI finished reviewing your PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (1)
119-160: Optional: add an invariance test for replica order.Consider a test that swaps replica order for a partition but preserves racks; the hash should stay the same since racks are sorted.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
build.gradle(1 hunks)checkstyle/import-control-group-coordinator.xml(1 hunks)gradle/dependencies.gradle(2 hunks)group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java(2 hunks)group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (3)
metadata/src/main/java/org/apache/kafka/image/ClusterImage.java (1)
ClusterImage(36-108)metadata/src/main/java/org/apache/kafka/image/TopicImage.java (1)
TopicImage(37-92)metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java (1)
BrokerRegistration(44-398)
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
clients/src/main/java/org/apache/kafka/common/Uuid.java (1)
Uuid(32-196)metadata/src/main/java/org/apache/kafka/image/MetadataImage.java (1)
MetadataImage(33-200)
🔇 Additional comments (7)
build.gradle (1)
1423-1424: Guava dep addition looks correct; ensure LICENSE update.Dependency wiring is fine for group-coordinator. Please update LICENSE-binary to account for Guava and any transitive artifacts.
gradle/dependencies.gradle (2)
64-65: Version pin for Guava is fine; please confirm license artifacts.Adding versions.guava is consistent. Ensure binary LICENSE files are updated accordingly.
151-152: libs mapping OK.libs.guava definition is correct and matches the version entry.
checkstyle/import-control-group-coordinator.xml (1)
80-81: Import allowance scoped appropriately.Allowing only com.google.common.hash is precise and aligned with usage.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java (1)
229-237: Deterministic group hash over sorted topics: LGTM.Sorting by topic name and combining ordered 64-bit hashes is clear and stable.
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java (2)
162-188: Group hash tests: LGTM.They capture ordering sensitivity as intended.
50-65: Ignore suggested test update:Group.computeTopicHashstill usestopicImage.id().hashCode(), so the expected hash in the test should remain based on.hashCode().Likely an incorrect or invalid review comment.
| static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { | ||
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(topicImage.id().hashCode()) // topic Id | ||
| .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name | ||
| .putInt(topicImage.partitions().size()); // number of partitions | ||
|
|
||
| topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { | ||
| topicHasher.putInt(entry.getKey()); // partition id | ||
| String racks = Arrays.stream(entry.getValue().replicas) | ||
| .mapToObj(clusterImage::broker) | ||
| .filter(Objects::nonNull) | ||
| .map(BrokerRegistration::rack) | ||
| .filter(Optional::isPresent) | ||
| .map(Optional::get) | ||
| .sorted() | ||
| .collect(Collectors.joining(";")); | ||
| topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" | ||
| }); | ||
| return topicHasher.hash().asLong(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use full 128-bit UUID; avoid ambiguous rack concatenation.
- Bug: putLong(topicImage.id().hashCode()) collapses 128-bit UUID to 32-bit int, increasing collision risk. Use most/least bits.
- Improvement: Joining racks with ";" can collide if rack strings contain ";" (e.g., "a;" + "b" vs "a" + ";b"). Encode lengths instead.
Apply this essential fix for UUID handling:
- Hasher topicHasher = hf.newHasher()
- .putByte((byte) 0) // magic byte
- .putLong(topicImage.id().hashCode()) // topic Id
- .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
- .putInt(topicImage.partitions().size()); // number of partitions
+ Hasher topicHasher = hf.newHasher()
+ .putByte((byte) 0) // magic byte
+ // topic id: use full 128 bits to avoid collisions
+ .putLong(topicImage.id().getMostSignificantBits())
+ .putLong(topicImage.id().getLeastSignificantBits())
+ .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name
+ .putInt(topicImage.partitions().size()); // number of partitionsOptionally harden rack encoding to avoid separator ambiguity:
- String racks = Arrays.stream(entry.getValue().replicas)
- .mapToObj(clusterImage::broker)
- .filter(Objects::nonNull)
- .map(BrokerRegistration::rack)
- .filter(Optional::isPresent)
- .map(Optional::get)
- .sorted()
- .collect(Collectors.joining(";"));
- topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";"
+ List<String> racks = Arrays.stream(entry.getValue().replicas)
+ .mapToObj(clusterImage::broker)
+ .filter(Objects::nonNull)
+ .map(BrokerRegistration::rack)
+ .flatMap(Optional::stream)
+ .sorted()
+ .toList();
+ topicHasher.putInt(racks.size());
+ for (String rack : racks) {
+ topicHasher.putInt(rack.length());
+ topicHasher.putString(rack, StandardCharsets.UTF_8);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| .putLong(topicImage.id().hashCode()) // topic Id | |
| .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name | |
| .putInt(topicImage.partitions().size()); // number of partitions | |
| topicImage.partitions().entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> { | |
| topicHasher.putInt(entry.getKey()); // partition id | |
| String racks = Arrays.stream(entry.getValue().replicas) | |
| .mapToObj(clusterImage::broker) | |
| .filter(Objects::nonNull) | |
| .map(BrokerRegistration::rack) | |
| .filter(Optional::isPresent) | |
| .map(Optional::get) | |
| .sorted() | |
| .collect(Collectors.joining(";")); | |
| topicHasher.putString(racks, StandardCharsets.UTF_8); // sorted racks with separator ";" | |
| }); | |
| return topicHasher.hash().asLong(); | |
| } | |
| static long computeTopicHash(TopicImage topicImage, ClusterImage clusterImage) { | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| // topic id: use full 128 bits to avoid collisions | |
| .putLong(topicImage.id().getMostSignificantBits()) | |
| .putLong(topicImage.id().getLeastSignificantBits()) | |
| .putString(topicImage.name(), StandardCharsets.UTF_8) // topic name | |
| .putInt(topicImage.partitions().size()); // number of partitions | |
| topicImage.partitions().entrySet().stream() | |
| .sorted(Map.Entry.comparingByKey()) | |
| .forEach(entry -> { | |
| topicHasher.putInt(entry.getKey()); // partition id | |
| // hardened rack encoding to avoid separator ambiguity | |
| List<String> racks = Arrays.stream(entry.getValue().replicas) | |
| .mapToObj(clusterImage::broker) | |
| .filter(Objects::nonNull) | |
| .map(BrokerRegistration::rack) | |
| .flatMap(Optional::stream) | |
| .sorted() | |
| .toList(); | |
| topicHasher.putInt(racks.size()); | |
| for (String rack : racks) { | |
| topicHasher.putInt(rack.length()); | |
| topicHasher.putString(rack, StandardCharsets.UTF_8); | |
| } | |
| }); | |
| return topicHasher.hash().asLong(); | |
| } |
| @Test | ||
| void testComputeTopicHashWithDifferentMagicByte() { | ||
| long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); | ||
|
|
||
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 1) // different magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | ||
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | ||
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | ||
| .putInt(0) // partition 0 | ||
| .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 | ||
| .putInt(1) // partition 1 | ||
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | ||
| assertNotEquals(topicHasher.hash().asLong(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Apply same UUID change here.
- .putByte((byte) 1) // different magic byte
- .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putByte((byte) 1) // different magic byte
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @Test | |
| void testComputeTopicHashWithDifferentMagicByte() { | |
| long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), FOO_METADATA_IMAGE.cluster()); | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 1) // different magic byte | |
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| .putInt(0) // partition 0 | |
| .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | |
| assertNotEquals(topicHasher.hash().asLong(), result); | |
| } | |
| @Test | |
| void testComputeTopicHashWithDifferentMagicByte() { | |
| long result = Group.computeTopicHash(FOO_METADATA_IMAGE.topics().getTopic(FOO_TOPIC_ID), | |
| FOO_METADATA_IMAGE.cluster()); | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 1) // different magic byte | |
| .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB) | |
| .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB) | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| .putInt(0) // partition 0 | |
| .putString("rack0;rack1", StandardCharsets.UTF_8) // rack of partition 0 | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | |
| assertNotEquals(topicHasher.hash().asLong(), result); | |
| } |
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 67 to 82, the test builds a manual topic hasher using
FOO_TOPIC_ID.hashCode(); update it to use the topic UUID components like the
production code: replace the single putLong(FOO_TOPIC_ID.hashCode()) call with
two calls putLong(FOO_TOPIC_ID.getMostSignificantBits()) and
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) so the test uses the UUID's
actual bits when computing the expected hash.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | ||
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | ||
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | ||
| // different partition order | ||
| .putInt(1) // partition 1 | ||
| .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 | ||
| .putInt(0) // partition 0 | ||
| .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 | ||
| assertNotEquals(topicHasher.hash().asLong(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Apply same UUID change here.
- .putByte((byte) 0) // magic byte
- .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putByte((byte) 0) // magic byte
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| // different partition order | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 | |
| .putInt(0) // partition 0 | |
| .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 | |
| assertNotEquals(topicHasher.hash().asLong(), result); | |
| } | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB) | |
| .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB) | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| // different partition order | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8) // rack of partition 1 | |
| .putInt(0) // partition 0 | |
| .putString("rack0;rack1", StandardCharsets.UTF_8); // rack of partition 0 | |
| assertNotEquals(topicHasher.hash().asLong(), result); |
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 88 to 100, the test still uses FOO_TOPIC_ID.hashCode() when
serializing the topic UUID; replace that single putLong(FOO_TOPIC_ID.hashCode())
with two explicit long writes for the UUID parts —
putLong(FOO_TOPIC_ID.getMostSignificantBits()) followed by
putLong(FOO_TOPIC_ID.getLeastSignificantBits()) — so the UUID is serialized
consistently with the other changes.
| HashFunction hf = Hashing.murmur3_128(); | ||
| Hasher topicHasher = hf.newHasher() | ||
| .putByte((byte) 0) // magic byte | ||
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | ||
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | ||
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | ||
| .putInt(0) // partition 0 | ||
| .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 | ||
| .putInt(1) // partition 1 | ||
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | ||
| assertNotEquals(topicHasher.hash().asLong(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Apply same UUID change here.
- .putByte((byte) 0) // magic byte
- .putLong(FOO_TOPIC_ID.hashCode()) // topic Id
+ .putByte((byte) 0) // magic byte
+ .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB)
+ .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| .putLong(FOO_TOPIC_ID.hashCode()) // topic Id | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| .putInt(0) // partition 0 | |
| .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | |
| assertNotEquals(topicHasher.hash().asLong(), result); | |
| } | |
| HashFunction hf = Hashing.murmur3_128(); | |
| Hasher topicHasher = hf.newHasher() | |
| .putByte((byte) 0) // magic byte | |
| .putLong(FOO_TOPIC_ID.getMostSignificantBits()) // topic Id (MSB) | |
| .putLong(FOO_TOPIC_ID.getLeastSignificantBits()) // topic Id (LSB) | |
| .putString(FOO_TOPIC_NAME, StandardCharsets.UTF_8) // topic name | |
| .putInt(FOO_NUM_PARTITIONS) // number of partitions | |
| .putInt(0) // partition 0 | |
| .putString("rack1;rack0", StandardCharsets.UTF_8) // different rack order of partition 0 | |
| .putInt(1) // partition 1 | |
| .putString("rack1;rack2", StandardCharsets.UTF_8); // rack of partition 1 | |
| assertNotEquals(topicHasher.hash().asLong(), result); | |
| } |
🤖 Prompt for AI Agents
In
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupTest.java
around lines 106 to 117, the test currently uses FOO_TOPIC_ID.hashCode() when
feeding the UUID into the hasher; change this to encode the full UUID like
elsewhere by replacing the single putLong(FOO_TOPIC_ID.hashCode()) with two
putLong calls that write FOO_TOPIC_ID.getMostSignificantBits() and
FOO_TOPIC_ID.getLeastSignificantBits() so the complete UUID is used in the
hashing.
User description
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
CodeAnt-AI Description
Add deterministic topic and group hashing to group coordinator
What Changed
Impact
✅ Detect topic metadata changes✅ Deterministic group hashes for metadata comparison✅ Verified sensitivity to id/name/partition/rack and ordering changes💡 Usage Guide
Checking Your Pull Request
Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.
Talking to CodeAnt AI
Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:
This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.
Retrigger review
Ask CodeAnt AI to review the PR again, by typing:
Check Your Repository Health
To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.
Summary by CodeRabbit
New Features
Tests
Chores